Spark 知道 DataFrame 的分区键吗?
Does Spark know the partitioning key of a DataFrame?
我想知道 Spark 是否知道 parquet 文件的分区键并使用此信息来避免随机播放。
上下文:
运行 Spark 2.0.1 运行 本地 SparkSession。我有一个 csv 数据集,我将其作为 parquet 文件保存在我的磁盘上,如下所示:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
我正在按 numerocarte
列创建 42 个分区。这应该将多个 numerocarte
分组到同一个分区。我不想在 write
时执行 partitionBy("numerocarte") 因为我不希望每张卡有一个分区。将有数百万。
之后,我在另一个脚本中读取了这个 SomeFile.parquet
实木复合地板文件并对其进行了一些操作。特别是我 运行 一个 window function
在它上面,分区是在镶木地板文件重新分区的同一列上完成的。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
在 read
之后,我可以看到 repartition
按预期工作,DataFrame df2
有 42 个分区,每个分区都有不同的卡片。
问题:
- Spark 知道数据帧
df2
是按列 numerocarte
分区的吗?
- 如果它知道,那么window函数中就不会有洗牌。真的吗?
- 如果它不知道,它会在window函数中进行随机播放。真的吗?
- 如果它不知道,我该如何告诉 Spark 数据已经按正确的列进行了分区?
- 如何检查
DataFrame
的分区键?有这个命令吗?我知道如何检查分区数但如何查看分区键?
- 当我在每个步骤后打印文件中的分区数时,我在
read
之后有 42 个分区,在 withColumn
之后有 200 个分区,这表明 Spark 重新分区了我的 DataFrame
。
- 如果我有两个使用同一列重新分区的不同表,连接会使用该信息吗?
Does Spark know that the dataframe df2 is partitioned by column numerocarte?
没有。
If it does not know, how do I tell Spark the data is already partitioned by the right column?
你不知道。仅仅因为您保存了已打乱的数据,并不意味着它将加载相同的拆分。
How can I check a partitioning key of DataFrame?
加载数据后没有分区键,但您可以检查 queryExecution
for Partitioner
。
实践中:
- 如果你想支持高效的按键下推,使用
DataFrameWriter
的partitionBy
方法。
- 如果您希望对连接优化提供有限支持,请使用
bucketBy
元存储和持久表。
有关详细示例,请参阅 。
我正在回答我自己的问题以供将来参考。
根据@user8371915 的建议,bucketBy 成功了!
我正在保存我的 DataFrame df
:
df.write
.bucketBy(250, "userid")
.saveAsTable("myNewTable")
然后当我需要加载这个时 table:
val df2 = spark.sql("SELECT * FROM myNewTable")
val w = Window.partitionBy("userid")
val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain
我确认,当我在 userid
分区的 df2
上执行 window 函数时,没有随机播放!谢谢@user8371915!
我在调查中学到的一些东西
- myNewTable 看起来像一个普通的 parquet 文件,但它不是。你可以用
spark.read.format("parquet").load("path/to/myNewTable")
正常读取它,但是这样创建的 DataFrame
将不会保留原来的分区!您必须使用 spark.sql
select
才能正确分区 DataFrame
.
- 您可以使用
spark.sql("describe formatted myNewTable").collect.foreach(println)
查看 table 内部。这将告诉您哪些列用于分桶以及有多少个分桶。
- Window 利用分区的函数和连接通常也需要排序。您可以在写入时使用
.sortBy()
对存储桶中的数据进行排序,排序也将保留在配置单元 table 中。 df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
- 在本地模式下工作时,table
myNewTable
会保存到我本地 Scala SBT 项目中的 spark-warehouse
文件夹中。通过spark-submit
用mesos集群模式保存时,保存到hive仓库。对我来说它位于 /user/hive/warehouse
.
- 在执行
spark-submit
时,您需要在 SparkSession
中添加两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083")
和 .enableHiveSupport()
。否则您创建的配置单元 table 将不可见。
- 如果您想将 table 保存到特定数据库,请在存储之前执行
spark.sql("USE your database")
。
更新 05-02-2018
我在使用 spark bucketing 和创建 Hive table 时遇到了一些问题。请参考为什么Spark saveAsTable with bucketBy创建数千个文件中的问题、回复和评论?
我想知道 Spark 是否知道 parquet 文件的分区键并使用此信息来避免随机播放。
上下文:
运行 Spark 2.0.1 运行 本地 SparkSession。我有一个 csv 数据集,我将其作为 parquet 文件保存在我的磁盘上,如下所示:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
我正在按 numerocarte
列创建 42 个分区。这应该将多个 numerocarte
分组到同一个分区。我不想在 write
时执行 partitionBy("numerocarte") 因为我不希望每张卡有一个分区。将有数百万。
之后,我在另一个脚本中读取了这个 SomeFile.parquet
实木复合地板文件并对其进行了一些操作。特别是我 运行 一个 window function
在它上面,分区是在镶木地板文件重新分区的同一列上完成的。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
在 read
之后,我可以看到 repartition
按预期工作,DataFrame df2
有 42 个分区,每个分区都有不同的卡片。
问题:
- Spark 知道数据帧
df2
是按列numerocarte
分区的吗? - 如果它知道,那么window函数中就不会有洗牌。真的吗?
- 如果它不知道,它会在window函数中进行随机播放。真的吗?
- 如果它不知道,我该如何告诉 Spark 数据已经按正确的列进行了分区?
- 如何检查
DataFrame
的分区键?有这个命令吗?我知道如何检查分区数但如何查看分区键? - 当我在每个步骤后打印文件中的分区数时,我在
read
之后有 42 个分区,在withColumn
之后有 200 个分区,这表明 Spark 重新分区了我的DataFrame
。 - 如果我有两个使用同一列重新分区的不同表,连接会使用该信息吗?
Does Spark know that the dataframe df2 is partitioned by column numerocarte?
没有。
If it does not know, how do I tell Spark the data is already partitioned by the right column?
你不知道。仅仅因为您保存了已打乱的数据,并不意味着它将加载相同的拆分。
How can I check a partitioning key of DataFrame?
加载数据后没有分区键,但您可以检查 queryExecution
for Partitioner
。
实践中:
- 如果你想支持高效的按键下推,使用
DataFrameWriter
的partitionBy
方法。 - 如果您希望对连接优化提供有限支持,请使用
bucketBy
元存储和持久表。
有关详细示例,请参阅
我正在回答我自己的问题以供将来参考。
根据@user8371915 的建议,bucketBy 成功了!
我正在保存我的 DataFrame df
:
df.write
.bucketBy(250, "userid")
.saveAsTable("myNewTable")
然后当我需要加载这个时 table:
val df2 = spark.sql("SELECT * FROM myNewTable")
val w = Window.partitionBy("userid")
val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain
我确认,当我在 userid
分区的 df2
上执行 window 函数时,没有随机播放!谢谢@user8371915!
我在调查中学到的一些东西
- myNewTable 看起来像一个普通的 parquet 文件,但它不是。你可以用
spark.read.format("parquet").load("path/to/myNewTable")
正常读取它,但是这样创建的DataFrame
将不会保留原来的分区!您必须使用spark.sql
select
才能正确分区DataFrame
. - 您可以使用
spark.sql("describe formatted myNewTable").collect.foreach(println)
查看 table 内部。这将告诉您哪些列用于分桶以及有多少个分桶。 - Window 利用分区的函数和连接通常也需要排序。您可以在写入时使用
.sortBy()
对存储桶中的数据进行排序,排序也将保留在配置单元 table 中。df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
- 在本地模式下工作时,table
myNewTable
会保存到我本地 Scala SBT 项目中的spark-warehouse
文件夹中。通过spark-submit
用mesos集群模式保存时,保存到hive仓库。对我来说它位于/user/hive/warehouse
. - 在执行
spark-submit
时,您需要在SparkSession
中添加两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083")
和.enableHiveSupport()
。否则您创建的配置单元 table 将不可见。 - 如果您想将 table 保存到特定数据库,请在存储之前执行
spark.sql("USE your database")
。
更新 05-02-2018
我在使用 spark bucketing 和创建 Hive table 时遇到了一些问题。请参考为什么Spark saveAsTable with bucketBy创建数千个文件中的问题、回复和评论?